﻿using log4net;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using System.Net.WebSockets;
using System.Text;
using System.IO;

namespace zbus
{
    public class Dict : SortedDictionary<string, object>
    {
        public new object this[string key]
        {
            get {
                object value;
                if (!this.TryGetValue(key, out value)) return null;
                return value; 
            }
            set { this.Add(key,value); }
        }
        
    }
    public class Message
    {
        public string Url { get; set; }
        public string Method { get; set; }
        public int? Status { get; set; }
        public Dict Headers = new Dict();
        public object Body { get; set; }

        public void Replace(Message msg)
        {
            this.Url = msg.Url;
            this.Method = msg.Method;
            this.Status = msg.Status;
            this.Headers = msg.Headers;
            this.Body = msg.Body;
        }
    }

    /// <summary> 
    /// </summary> 
    public class WebsocketClient : IDisposable
    {
        private static readonly ILog logger = LogManager.GetLogger(typeof(WebsocketClient)); 
        public int HeartbeatInterval { get; set; } = 30000; //30s
        public int ReconnectingTimeoutMs { get; set; } = 3000; //3 seconds by default to retry if in disconnected.
        public bool AuthEnabled { get; set; } = false;
        public string ApiKey { get; set; }
        public string SecretKey { get; set; }

        private bool _disposing = false;

        protected readonly string address;
        protected ClientWebSocket _client;

        public event Action<WebsocketClient> OnOpen;
        public event Action<WebsocketClient> OnError;
        public event Action<WebsocketClient> OnClose;
        public event Action<Message> OnMessage;

        public Action<Message> BeforeSend;
        public Action<Message> AfterRecv; 
        
        protected ConcurrentDictionary<string, Action<Message>> callbackTable = new ConcurrentDictionary<string, Action<Message>>();

        private SemaphoreSlim _connectingLocker = new SemaphoreSlim(1);
        private SemaphoreSlim readLocker = new SemaphoreSlim(1);
        private SemaphoreSlim writeLocker = new SemaphoreSlim(1);
        private SemaphoreSlim callbackLocker = new SemaphoreSlim(1);

        //private Task _sendingTask = null;
        private SemaphoreSlim _reconnectingLocker = new SemaphoreSlim(1);
        private SemaphoreSlim _closeLocker = new SemaphoreSlim(1);

        private CancellationTokenSource _cancelation = new CancellationTokenSource();

        private Thread recvThread; 
        private Thread heartbeatThread;
        protected Message heartbeatMessage = null;

        //private int iCount = 0;

        public WebsocketClient(string address)
        {
            if(!address.StartsWith("ws://") && !address.StartsWith("wss://"))
            {
                address = "ws://" + address;
            }
            this.address = address; 
        }

        public bool Active
        {
            get
            {
                return this._client != null && this._client.State == WebSocketState.Open;
            }
        }

        public void Dispose()
        {
            _disposing = true;
            this._cancelation.Cancel();
            CloseConnection();
        }

        public async Task ConnectAsync(CancellationToken? token = null)
        {
            logger.Debug("Internel connecting ..");
            try
            {
                await _connectingLocker.WaitAsync();
                if (Active) return;
                this._client = new ClientWebSocket();
                if(token == null)
                {
                    token = CancellationToken.None;
                }
                await this._client.ConnectAsync(new Uri(this.address), token.Value);

                if (_client.State == WebSocketState.Connecting)
                {
                    logger.Info(($"ConnectInternal: websocket is connecting : state {_client.State}"));
                    await Task.Delay(1000);
                }

                //If no error connected event triggered
                logger.Info(($"ConnectInternal: Connected to {address}, websocket connected: state {_client.State}"));
                OnOpen?.Invoke(this);
            }
            catch(Exception e)
            {
                while(e.InnerException != null)
                {
                    e = e.InnerException;
                }
                OnError?.Invoke(this);
                logger.Error(e);
            }
            finally
            {
                _connectingLocker.Release();
            } 

            //Trying to start recv thread if not start yet
            if (this.recvThread != null) return;
            try
            {  
                await _connectingLocker.WaitAsync();
                this.recvThread = new Thread(() =>
                {
                    Listen().Wait();
                });
                this.recvThread.IsBackground = false;
                this.recvThread.Start();

                this.Heartbeat();
            }
            finally
            {
                _connectingLocker.Release();
            }  
        }

        private async Task<ClientWebSocket> GetClient()
        {
            if (_client == null ||
                (_client.State != WebSocketState.Open))
            {
                logger.Error($"GetClient error, State[{_client.State}] CloseStatus[{_client.CloseStatus} : {_client.CloseStatusDescription}]");

                await Reconnect();
            }
            return _client;
        }

        private async Task Reconnect()
        {
            if (_disposing)
                return;

            try
            {
                logger.Warn(("Reconnecting..."));

                await _reconnectingLocker.WaitAsync();
                if (Active) return;

                await this.CloseInternal();

                _cancelation = new CancellationTokenSource();
                await ConnectAsync(_cancelation.Token);
            }
            finally
            {
                _reconnectingLocker.Release();
            }
        }

        private async Task Listen()
        {
            logger.Info(("Listening..."));

            while (!_cancelation.IsCancellationRequested)
            {
                logger.Debug(("Listen receiving..."));
                try
                {
                    Message res = await RecvAsync(_cancelation.Token);
                    if (res == null)
                    {
                        continue;
                    }
                    string id = (string)res.Headers[Protocol.ID];

                    try
                    {
                        await callbackLocker.WaitAsync();
                        if (callbackTable.TryRemove(id, out var cb))
                        {
                            try
                            {
                                cb(res);
                            }
                            catch (Exception e)
                            {
                                logger.Error(e.Message, e);
                            }
                            continue;
                        }
                    }
                    finally
                    {
                        callbackLocker.Release();
                    }

                    OnMessage?.Invoke(res);
                }
                catch (TaskCanceledException e)
                {
                    logger.Error($"Listen ReceiveAsync TaskCanceledException : {e}.");
                    return;
                }
                catch (Exception e)
                {
                    while (e.InnerException != null)
                    {
                        e = e.InnerException;
                    }
                    if (e is SocketException)
                    {
                        logger.Error($"Listen ReceiveAsync WebSocketException : {e}.");
                        await Task.Delay(ReconnectingTimeoutMs);
                        if (Active)
                        {
                            continue;
                        }
                        await Reconnect();
                        logger.Error($"Listen ReceiveAsync WebSocketException, reconneted.");
                    }
                    else
                    {
                        logger.Error($"Listen ReceiveAsync exception : {e}.");
                        await Task.Delay(ReconnectingTimeoutMs);
                        if (Active)
                        {
                            continue;
                        }
                        await Reconnect();
                        logger.Error($"Listen ReceiveAsync exception, reconneted.");
                    }
                }
            }
        }

        public async Task<Message> InvokeAsync(Message req, int timeoutMillis=5000, Action<Message> beforeSend = null)
        {
            string id = Guid.NewGuid().ToString();
            req.Headers[Protocol.ID] = id;

            ManualResetEvent sync = new ManualResetEvent(false);
            Message res = null;
            try
            {
                await callbackLocker.WaitAsync();
                callbackTable.TryAdd(id, (msg) =>
                {
                    res = msg;
                    sync.Set();
                });
            }
            catch (Exception e)
            {
                logger.ErrorFormat("InvokeAsync error count[{0}] - id[{1}] - {2}.", callbackTable.Count, id, e);
            }
            finally
            {
                callbackLocker.Release();
            }

            await SendAsync(req, beforeSend);

            if (!sync.WaitOne(timeoutMillis))
            {
                await callbackLocker.WaitAsync();
                callbackTable.TryRemove(id, out var cb);
                callbackLocker.Release();
            }
            return res; 
        }

        public async Task SendAsync(Message req, Action<Message> beforeSend = null, CancellationToken ? token = null)
        { 
            if (!Active)
            {
                await Reconnect();
            }
            try
            {
                //if (iCount++ == 5)
                //{
                //    _client.Abort();
                //}

                await writeLocker.WaitAsync();

                var client = await GetClient();
                if (client == null || !Active)
                {
                    return;
                }

                await SendUnsafeAsync(req, beforeSend, token);
            }
            finally
            {
                writeLocker.Release();
            }
        }
        protected async Task SendUnsafeAsync(Message req, Action<Message> beforeSend = null, CancellationToken ? token = null)
        {
            if (token == null)
            {
                token = CancellationToken.None;
            }
            if (beforeSend == null) beforeSend = BeforeSend;
            beforeSend?.Invoke(req);
            if (AuthEnabled)
            {
                Auth.Sign(ApiKey, SecretKey, req);
            }
            string msg = JsonKit.SerializeObject(req);
            try
            {
                var buffer = Encoding.UTF8.GetBytes(msg);
                var messageSegment = new ArraySegment<byte>(buffer);

                await _client.SendAsync(messageSegment, WebSocketMessageType.Text, true, (CancellationToken)token);
            }
            catch (Exception e)
            {
                logger.Error(("Exception while SendAsync."), e);
                await Task.Delay(ReconnectingTimeoutMs);
                if (Active)
                    return;
                await Reconnect();
            }
            finally
            {
            }
        }  

        protected async Task<Message> RecvAsync(CancellationToken? token = null)
        {
            if (!Active)
            {
                await Reconnect();
            }

            try
            {
                await readLocker.WaitAsync();

                return await RecvUnsafeAsync(token);
            }
            finally
            {
                readLocker.Release();
            }
        }

        protected async Task<Message> RecvUnsafeAsync(CancellationToken? token = null)
        {
            if (token == null)
            {
                token = CancellationToken.None;
            }

            ArraySegment<Byte> buffer = new ArraySegment<byte>(new Byte[8192]);
            WebSocketReceiveResult data = null;
            using (var ms = new MemoryStream())
            {
                do
                {
                    data = await _client.ReceiveAsync(buffer, (CancellationToken)token);
                    ms.Write(buffer.Array, buffer.Offset, data.Count);
                }
                while (!data.EndOfMessage);

                ms.Seek(0, SeekOrigin.Begin);
                if (data.MessageType == WebSocketMessageType.Text)
                {
                    using (var reader = new StreamReader(ms, Encoding.UTF8))
                    {
                        var received = reader.ReadToEnd();

                        logger.DebugFormat("processing {0}.", received);
                        Message res = JsonKit.DeserializeObject<Message>(received);
                        AfterRecv?.Invoke(res);
                        return res;
                    }
                }
                else if (data.MessageType == WebSocketMessageType.Binary)
                {
                    var received = new byte[ms.Length];
                    ms.Read(received, 0, (int)ms.Length);
                    ms.Seek(0, SeekOrigin.Begin);
                }
                else if (data.MessageType == WebSocketMessageType.Close)
                {
                    logger.Error(($"Closing ... reason {_client.CloseStatusDescription}"));
                    var description = _client.CloseStatusDescription;
                    await _client.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
                    OnClose?.Invoke(this);
                    return null;
                }
                else
                {
                    logger.Error(($"Listen NotSupportedException: meesageType:{data.MessageType} state:{_client.State}."));
                    throw new NotSupportedException();
                }

                return null;
            }
        }

        private void CloseConnection()
        {
            logger.Debug("Enter CloseConnection");
            if (_client != null)
            {
                _client.Dispose();
            }
            logger.Debug("Exit CloseConnection");
        }

        public void Heartbeat()
        {
            if (heartbeatMessage == null) return;
            if (this.heartbeatThread != null) return;
            lock (this)
            {
                if (this.heartbeatThread != null) return;

                this.heartbeatThread = new Thread(async () =>
                {
                   while (!_cancelation.IsCancellationRequested)
                   {
                       await Task.Delay(this.HeartbeatInterval);
                       try
                       {
                           if (this.Active)
                           {
                               await SendAsync(heartbeatMessage, BeforeSend);
                           }
                       }
                       catch
                       {
                            //ignore
                       }
                   }
               });
               this.heartbeatThread.Start();
            }
        }

        public async Task CloseInternal()
        {
            logger.Debug("Internel closing ..");
            try
            {
                await _closeLocker.WaitAsync();
                {
                    await Task.Delay(1000);
                    _client?.Abort();
                    _client?.Dispose();
                }

#pragma warning disable 4014
                Task.Run(() => OnClose?.Invoke(this));
#pragma warning restore 4014

                logger.Debug("Internel closed.");

                return;
            }
            finally
            {
                _closeLocker.Release();
            }
        }

    }
}
